{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "f48d0afd",
   "metadata": {},
   "source": [
    "# First Steps"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fd434418",
   "metadata": {},
   "source": [
    "## Creating a simple Kafka consumer app"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1482b8b9",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "import os\n",
    "import platform\n",
    "\n",
    "from IPython.display import Markdown as md\n",
    "\n",
    "from fastkafka.testing import ApacheKafkaBroker, run_script_and_cancel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4a18cdd7",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "# | notest\n",
    "\n",
    "import nest_asyncio"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "75545b26",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "# | notest\n",
    "\n",
    "nest_asyncio.apply()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1073bb3b",
   "metadata": {},
   "source": [
    "For our first demo we will create the simplest possible Kafka consumer and run it using 'fastkafka run' command.\n",
    "\n",
    "The consumer will:\n",
    "\n",
    "1. Connect to the Kafka Broker we setup in the Intro guide\n",
    "\n",
    "2. Listen to the hello topic\n",
    "\n",
    "3. Write any message received from the hello topic to stdout\n",
    "    \n",
    "To create the consumer, first, create a file named <b>hello_kafka_consumer.py</b> and copy the following code to it:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "825bf08a",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "from os import environ\n",
       "\n",
       "from fastkafka import FastKafka\n",
       "from pydantic import BaseModel, Field\n",
       "\n",
       "kafka_server_url = environ[\"KAFKA_HOSTNAME\"]\n",
       "kafka_server_port = environ[\"KAFKA_PORT\"]\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"localhost\": {\n",
       "        \"description\": \"local development kafka\",\n",
       "        \"url\": kafka_server_url,\n",
       "        \"port\": kafka_server_port\n",
       "    }\n",
       "}\n",
       "\n",
       "class HelloKafkaMsg(BaseModel):\n",
       "    msg: str = Field(\n",
       "        ...,\n",
       "        example=\"Hello\",\n",
       "        description=\"Demo hello world message\",\n",
       "    )\n",
       "\n",
       "kafka_app = FastKafka(\n",
       "    kafka_brokers=kafka_brokers\n",
       ")\n",
       "    \n",
       "@kafka_app.consumes()\n",
       "async def on_hello(msg: HelloKafkaMsg):\n",
       "    print(f\"Got data, msg={msg.msg}\", flush=True)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "consumer_script = \"\"\"\n",
    "from os import environ\n",
    "\n",
    "from fastkafka import FastKafka\n",
    "from pydantic import BaseModel, Field\n",
    "\n",
    "kafka_server_url = environ[\"KAFKA_HOSTNAME\"]\n",
    "kafka_server_port = environ[\"KAFKA_PORT\"]\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"localhost\": {\n",
    "        \"description\": \"local development kafka\",\n",
    "        \"url\": kafka_server_url,\n",
    "        \"port\": kafka_server_port\n",
    "    }\n",
    "}\n",
    "\n",
    "class HelloKafkaMsg(BaseModel):\n",
    "    msg: str = Field(\n",
    "        ...,\n",
    "        example=\"Hello\",\n",
    "        description=\"Demo hello world message\",\n",
    "    )\n",
    "\n",
    "kafka_app = FastKafka(\n",
    "    kafka_brokers=kafka_brokers\n",
    ")\n",
    "    \n",
    "@kafka_app.consumes()\n",
    "async def on_hello(msg: HelloKafkaMsg):\n",
    "    print(f\"Got data, msg={msg.msg}\", flush=True)\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{consumer_script}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0845837e",
   "metadata": {},
   "source": [
    "!!! info \\\"Kafka configuration\\\"\n",
    "\n",
    "    This consumer script uses KAFKA_HOSTNAME and KAFKA_PORT environment vars, so make sure that you have exported them into your environment before running the following comand (e.g. in shell, for KAFKA_HOSTNAME, run: 'export KAFKA_HOSTNAME=kafka')."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6b95e812",
   "metadata": {},
   "source": [
    "!!! warning \\\"Remember to flush\\\"\n",
    "\n",
    "    Notice the **flush=True** option when using print in our consumer. This is because standard python print function doesn't flush by default. To be able to log the worker outputs in real time when using fastkafka run command, the outputs need to be flushed, they will be logged when stopping the worker otherwise."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3efde628",
   "metadata": {},
   "source": [
    "To run this consumer, in your terminal, run:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9b5100a2",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```shell\n",
       "fastkafka run --num-workers=1 --kafka-broker localhost hello_kafka_consumer:kafka_app\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "consumer_cmd = \"fastkafka run --num-workers=1 --kafka-broker localhost hello_kafka_consumer:kafka_app\"\n",
    "\n",
    "md(f\"```shell\\n{consumer_cmd}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9f3770bc",
   "metadata": {},
   "source": [
    "After running the command, you should see something similar to the ouput below:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6e9ede31",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[878412]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'\n",
      "[878412]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
      "[878412]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n",
      "[878412]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
      "[878412]: [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'hello'})\n",
      "[878412]: [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'hello'}\n",
      "[878412]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
      "[878412]: [WARNING] aiokafka.cluster: Topic hello is not available during auto-create initialization\n",
      "[878412]: [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'hello': 0}. \n",
      "Starting process cleanup, this may take a few seconds...\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 878412...\n",
      "[878412]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n",
      "[878412]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Process 878412 terminated.\n",
      "\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 877951...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 877951 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 877579...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 877579 terminated.\n"
     ]
    }
   ],
   "source": [
    "# | echo: false\n",
    "async with ApacheKafkaBroker() as bootstrap_server:\n",
    "    os.environ[\"KAFKA_HOSTNAME\"], os.environ[\"KAFKA_PORT\"] = bootstrap_server.split(\":\")\n",
    "\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=consumer_script,\n",
    "        script_file=\"hello_kafka_consumer.py\",\n",
    "        cmd=consumer_cmd,\n",
    "        cancel_after=10,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, output.decode(\"utf-8\")\n",
    "    print(output.decode(\"utf-8\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "29f4df11",
   "metadata": {},
   "source": [
    "Now you can interact with your consumer, by sending the messages to the subscribed 'hello' topic, don't worry, we will cover this in the next step of this guide."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d28903bc",
   "metadata": {},
   "source": [
    "## Sending first message to your consumer"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "62de2731",
   "metadata": {},
   "source": [
    "After we have created and run our first consumer, we should send a message to it, to make sure it is working properly.\n",
    "\n",
    "If you are using the Kafka setup as described in the Intro guide, you can follow the steps listed here to send a message to the hello topic.\n",
    "\n",
    "First, connect to your running kafka broker by running:\n",
    "\n",
    "``` shell\n",
    "docker run -it kafka /bin/bash\n",
    "```\n",
    "\n",
    "Then, when connected to the container, run:\n",
    "\n",
    "``` shell\n",
    "kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=hello\n",
    "```\n",
    "\n",
    "This will open an interactive connection to the hello topic, now you can write your mesages to the topic and they will be consumed by our consumer.\n",
    "\n",
    "In the shell, type:\n",
    "``` shell\n",
    "{\"msg\":\"hello\"}\n",
    "```\n",
    "and press enter. This will send a hello message to the topic which will be read by our running consumer and outputed to stdout.\n",
    "\n",
    "Check the output of your consumer (terminal where you ran the 'fastkafka run' command) and confirm that your consumer has read the Kafka message. You shoud see something like this:\n",
    "``` shell\n",
    "Got data, msg=hello\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fc013025",
   "metadata": {},
   "source": [
    "## Creating a hello Kafka producer"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a65e725f",
   "metadata": {},
   "source": [
    "Consuming messages is only a part of this Library functionality, the other big part is producing the messages. So, let's create our first kafka producer which will send it's greetings to our consumer periodically.\n",
    "\n",
    "The producer will:\n",
    "\n",
    "1. Connect to the Kafka Broker we setup in the Intro guide\n",
    "2. Connect to the hello topic\n",
    "3. Periodically send a message to the hello world topic\n",
    "    \n",
    "To create the producer, first, create a file named <b>hello_kafka_producer.py</b> and copy the following code to it:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4c3c5876",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "from os import environ\n",
       "\n",
       "import asyncio\n",
       "from pydantic import BaseModel, Field\n",
       "\n",
       "from fastkafka import FastKafka\n",
       "from fastkafka._components.logger import get_logger\n",
       "\n",
       "kafka_server_url = environ[\"KAFKA_HOSTNAME\"]\n",
       "kafka_server_port = environ[\"KAFKA_PORT\"]\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"localhost\": {\n",
       "        \"description\": \"local development kafka\",\n",
       "        \"url\": kafka_server_url,\n",
       "        \"port\": kafka_server_port\n",
       "    }\n",
       "}\n",
       "\n",
       "class HelloKafkaMsg(BaseModel):\n",
       "    msg: str = Field(\n",
       "        ...,\n",
       "        example=\"Hello\",\n",
       "        description=\"Demo hello world message\",\n",
       "    )\n",
       "\n",
       "kafka_app = FastKafka(\n",
       "    kafka_brokers=kafka_brokers\n",
       ")\n",
       "\n",
       "logger = get_logger(__name__)\n",
       "\n",
       "@kafka_app.produces()\n",
       "async def to_hello(msg: HelloKafkaMsg) -> HelloKafkaMsg:\n",
       "    logger.info(f\"Producing: {msg}\")\n",
       "    return msg\n",
       "\n",
       "@kafka_app.run_in_background()\n",
       "async def hello_every_second():\n",
       "    while(True):\n",
       "        await to_hello(HelloKafkaMsg(msg=\"hello\"))\n",
       "        await asyncio.sleep(1)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "producer_script = \"\"\"\n",
    "from os import environ\n",
    "\n",
    "import asyncio\n",
    "from pydantic import BaseModel, Field\n",
    "\n",
    "from fastkafka import FastKafka\n",
    "from fastkafka._components.logger import get_logger\n",
    "\n",
    "kafka_server_url = environ[\"KAFKA_HOSTNAME\"]\n",
    "kafka_server_port = environ[\"KAFKA_PORT\"]\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"localhost\": {\n",
    "        \"description\": \"local development kafka\",\n",
    "        \"url\": kafka_server_url,\n",
    "        \"port\": kafka_server_port\n",
    "    }\n",
    "}\n",
    "\n",
    "class HelloKafkaMsg(BaseModel):\n",
    "    msg: str = Field(\n",
    "        ...,\n",
    "        example=\"Hello\",\n",
    "        description=\"Demo hello world message\",\n",
    "    )\n",
    "\n",
    "kafka_app = FastKafka(\n",
    "    kafka_brokers=kafka_brokers\n",
    ")\n",
    "\n",
    "logger = get_logger(__name__)\n",
    "\n",
    "@kafka_app.produces()\n",
    "async def to_hello(msg: HelloKafkaMsg) -> HelloKafkaMsg:\n",
    "    logger.info(f\"Producing: {msg}\")\n",
    "    return msg\n",
    "\n",
    "@kafka_app.run_in_background()\n",
    "async def hello_every_second():\n",
    "    while(True):\n",
    "        await to_hello(HelloKafkaMsg(msg=\"hello\"))\n",
    "        await asyncio.sleep(1)\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{producer_script}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f4d3eb5f",
   "metadata": {},
   "source": [
    "!!! info \\\"Kafka configuration\\\"\n",
    "\n",
    "    This producer script uses KAFKA_HOSTNAME and KAFKA_PORT environment vars, so make sure that you have exported them into your environment before running the following comand (e.g. in shell, for KAFKA_HOSTNAME, run: 'export KAFKA_HOSTNAME=kafka')."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f310d1cd",
   "metadata": {},
   "source": [
    "To run this producer, in your terminal, run:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "070cd807",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```shell\n",
       "fastkafka run --num-workers=1 --kafka-broker localhost hello_kafka_producer:kafka_app\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "producer_cmd = \"fastkafka run --num-workers=1 --kafka-broker localhost hello_kafka_producer:kafka_app\"\n",
    "\n",
    "md(f\"```shell\\n{producer_cmd}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8de248bc",
   "metadata": {},
   "source": [
    "After running the command, you should see something similar to the ouput below:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3cf137c1",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[879272]: [INFO] fastkafka._application.app: run_in_background() : Adding function 'hello_every_second' as background task\n",
      "[879272]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'\n",
      "[879272]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
      "[879272]: [INFO] fastkafka._application.app: _populate_bg_tasks() : Starting background task 'hello_every_second'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [WARNING] aiokafka.cluster: Topic hello is not available during auto-create initialization\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "Starting process cleanup, this may take a few seconds...\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 879272...\n",
      "[879272]: [INFO] hello_kafka_producer: Producing: msg='hello'\n",
      "[879272]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Cancelling background task 'hello_every_second'\n",
      "[879272]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Waiting for background task 'hello_every_second' to finish\n",
      "[879272]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Execution finished for background task 'hello_every_second'\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Process 879272 terminated.\n",
      "\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 878808...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 878808 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 878435...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 878435 terminated.\n"
     ]
    }
   ],
   "source": [
    "# | echo: false\n",
    "async with ApacheKafkaBroker() as bootstrap_server:\n",
    "    os.environ[\"KAFKA_HOSTNAME\"], os.environ[\"KAFKA_PORT\"] = bootstrap_server.split(\":\")\n",
    "\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=producer_script,\n",
    "        script_file=\"hello_kafka_producer.py\",\n",
    "        cmd=producer_cmd,\n",
    "        cancel_after=10,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, output.decode(\"utf-8\")\n",
    "    print(output.decode(\"utf-8\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0b869e2b",
   "metadata": {},
   "source": [
    "Now, while the producer is running, it will send a HelloKafkaMsg every second to the hello kafka topic.\n",
    "If your consumer is still running, you should see the messages appear in its log."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1286a108",
   "metadata": {},
   "source": [
    "## Recap"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f3409cc2",
   "metadata": {},
   "source": [
    "In this guide we have:\n",
    "    \n",
    "1. Created a simple Kafka consumer using FastKafka\n",
    "2. Sent a message to our consumer trough Kafka\n",
    "3. Created a simple Kafka producer using FastKafka"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
